Redis实战

您所在的位置:网站首页 java 发布订阅 Redis实战

Redis实战

2024-07-13 20:55:55| 来源: 网络整理| 查看: 265

借鉴:https://blog.csdn.net/canot/article/details/51938955

 

1.什么是pub/sub

Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。 同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,Redis作为一个pub/sub的server,在订阅者和发布者之间起到了消息路由的功能。

2.Redis pub/sub的实现

Redis通过publish和subscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。

客户端1订阅CCTV1:

127.0.0.1:6379> subscribe CCTV1 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "CCTV1" 3) (integer) 1

 

 

客户端2订阅CCTV1和CCTV2:

127.0.0.1:6379> subscribe CCTV1 CCTV2 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "CCTV1" 3) (integer) 1 1) "subscribe" 2) "CCTV2" 3) (integer) 2

 

此时这两个客户端分别监听这指定的频道。现在另一个客户端向服务器推送了关于这两个频道的信息。

127.0.0.1:6379> publish CCTV1 "cctv1 is good" (integer) 2 //返回2表示两个客户端接收了次消息。被接收到消息的客户端如下所示。 1) "message" 2) "CCTV1" 3) "cctv1 is good" ---- 1) "message" 2) "CCTV1" 3) "cctv1 is good"

如上的订阅/发布也称订阅发布到频道(使用publish与subscribe命令),此外还有订阅发布到模式(使用psubscribe来订阅一个模式)

订阅CCTV的全部频道

127.0.0.1:6379> psubscribe CCTV* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "CCTV*" 3) (integer) 1

当依然先如上推送一个CCTV1的消息时,该客户端正常接收。

3.Pub/Sub在java中的实现

导入Redis驱动:

redis.clients jedis 2.9.0

 

Redis驱动包提供了一个抽象类:JedisPubSub…继承这个类就完成了对客户端对订阅的监听。示例代码:

/** * redis发布订阅消息监听器 * @ClassName: RedisMsgPubSubListener * @Description: TODO * @author OnlyMate * @Date 2018年8月22日 上午10:05:35 * */ public class RedisMsgPubSubListener extends JedisPubSub { private Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class); @Override public void unsubscribe() { super.unsubscribe(); } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); } @Override public void subscribe(String... channels) { super.subscribe(channels); } @Override public void psubscribe(String... patterns) { super.psubscribe(patterns); } @Override public void punsubscribe() { super.punsubscribe(); } @Override public void punsubscribe(String... patterns) { super.punsubscribe(patterns); } @Override public void onMessage(String channel, String message) { logger.info("onMessage: channel[{}], message[{}]",channel, message); } @Override public void onPMessage(String pattern, String channel, String message) { logger.info("onPMessage: pattern[{}], channel[{}], message[{}]", pattern, channel, message); } @Override public void onSubscribe(String channel, int subscribedChannels) { logger.info("onSubscribe: channel[{}], subscribedChannels[{}]", channel, subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { logger.info("onPUnsubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { logger.info("onPSubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { logger.info("channel:{} is been subscribed:{}", channel, subscribedChannels); } }

 

如上所示,抽象类中存在的方法。分别表示

监听到订阅模式接受到消息时的回调 (onPMessage) 监听到订阅频道接受到消息时的回调 (onMessage ) 订阅频道时的回调( onSubscribe ) 取消订阅频道时的回调( onUnsubscribe ) 订阅频道模式时的回调 ( onPSubscribe ) 取消订阅模式时的回调( onPUnsubscribe )

运行我们刚刚编写的类:

订阅者

/** * 订阅者 * @ClassName: RedisSubTest * @Description: TODO * @author OnlyMate * @Date 2018年8月23日 下午2:59:42 * */ public class RedisSubTest { @Test public void subjava() { System.out.println("订阅者 "); Jedis jr = null; try { jr = new Jedis("127.0.0.1", 6379, 0);// redis服务地址和端口号 RedisMsgPubSubListener sp = new RedisMsgPubSubListener(); // jr客户端配置监听两个channel jr.subscribe(sp, "news.share", "news.blog"); } catch (Exception e) { e.printStackTrace(); } finally { if (jr != null) { jr.disconnect(); } } } }

发布者

/** * 发布者 * @ClassName: RedisPubTest * @Description: TODO * @author OnlyMate * @Date 2018年8月23日 下午2:59:25 * */ public class RedisPubTest { @Test public void pubjava() { System.out.println("发布者 "); Jedis jr = null; try { jr = new Jedis("127.0.0.1", 6379, 0);// redis服务地址和端口号 // jr客户端配置监听两个channel jr.publish( "news.share", "新闻分享"); jr.publish( "news.blog", "新闻博客"); } catch (Exception e) { e.printStackTrace(); } finally { if (jr != null) { jr.disconnect(); } } } }

 

从代码中我们不难看出,我们声明的一个redis链接在设置监听后就可以执行一些操作,例如发布消息,订阅消息等。。。 当运行上述代码后会在控制台输出:

此时当在有客户端向new.share或者new.blog通道publish消息时,onMessage方法即可被相应。(jedis.publish(channel, message))。

 

4.Pub/Sub在Spring中的实践 导入依赖jar

redis.clients jedis 2.9.0 org.springframework.data spring-data-redis 2.0.8.RELEASE

 核心消息监听器

/** * redis发布订阅消息监听器 * @ClassName: RedisMsgPubSubListener * @Description: TODO * @author OnlyMate * @Date 2018年8月22日 上午10:05:35 * */ public class RedisMsgPubSubListener implements MessageListener { private Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class); @Override public void onMessage( final Message message, final byte[] pattern ) { RedisSerializer serializer = redisTemplate.getValueSerializer(); // message.getBody()是Redis的值,需要用redis的valueSerializer反序列化 logger.info("Message receive-->pattern:{},message: {},{}", new String(pattern), serializer.deserialize(message.getBody()), redisTemplate.getStringSerializer().deserialize(message.getChannel())); logger.info(message.toString()); JSONObject json = JSONObject.parseObject(serializer.deserialize(message.getBody()).toString()); String cutomerId = json.getString("cutomerId"); //可以与WebSocket结合使用,解决分布式服务中,共享Session if(StringUtils.isNotEmpty(cutomerId)) { logger.info("cutomerId: {},消息:{}", cutomerId, message.toString()); }else { logger.info("cutomerId 为空,无法推送给对应的客户端,消息:{}", message.toString()); } } }

 

 

现在我们在获取RedisTemplate,并给WEB_SOCKET:LOTTERY这个channel publish数据。

/** * 发布者 * @ClassName: RedisMsgPubClient * @Description: TODO * @author OnlyMate * @Date 2018年8月23日 下午3:59:33 * */ @Controller @RequestMapping(value="/redisMsgPubClientBySpring") public class RedisMsgPubClient { private Logger logger = LoggerFactory.getLogger(RedisMsgPubClient.class); @Autowired private RedisTemplate redisTemplate; @RequestMapping @ResponseBody public String pubMsg(HttpServletRequest request, HttpServletResponse response) { String cutomerId = request.getParameter("cutomerId").toString(); String msg = request.getParameter("msg").toString(); logger.info("发布消息:{}", request.getParameter("msg").toString()); JSONObject json = new JSONObject(); json.put("cutomerId", cutomerId); json.put("msg", msg); redisTemplate.convertAndSend("WEB_SOCKET:LOTTERY", json); return "成功"; } }

 

最后一步reids的配置

redis 相关类 Spring 托管

如上的配置即配置了对Redis的链接。在配置类中的将ChannelTopic加入IOC容器。则在Spring启动时会在一个RedisTemplate(一个对Redis的链接)中设置的一个channel,即WEB_SOCKET:LOTTERY。 在上述配置中,RedisMsgPubSubListener是我们生成的,这个类即为核心监听类,RedisTemplate接受到数据如何处理就是在该类中处理的。

附加上Java配置

import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import org.springframework.cache.CacheManager; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.cache.interceptor.KeyGenerator; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.gfss.common.listener.CustomRedisMsgPubSubListener; @Configuration @EnableCaching public class RedisConfiguration extends CachingConfigurerSupport { @Override @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); } }; } @Bean public CacheManager cacheManager(RedisTemplate redisTemplate) { return new RedisCacheManager(redisTemplate); } @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer( Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } @Bean public RedisTemplate objectRedisTemplate(RedisConnectionFactory factory) { RedisTemplate template = new RedisTemplate(); template.setConnectionFactory(factory); template.setKeySerializer(new StringRedisSerializer()); return template; } /************** 配置redis发布订阅模式 *******************************/ @Bean public CustomRedisMsgPubSubListener customRedisMsgPubSubListener() { return new CustomRedisMsgPubSubListener(); } @Bean public MessageListenerAdapter messageListenerAdapter(MessageListener messageListener) { return new MessageListenerAdapter(messageListener); } @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) { List collection = new ArrayList(); // 普通订阅,订阅具体的频道 ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET:LOTTERY"); collection.add(channelTopic); /*// 模式订阅,支持模式匹配订阅,*为模糊匹配符 PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*"); collection.add(PatternTopic); // 匹配所有频道 PatternTopic PatternTopicAll = new PatternTopic("*"); collection.add(PatternTopicAll);*/ RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); redisMessageListenerContainer.addMessageListener(messageListenerAdapter, collection); return redisMessageListenerContainer; } }

 

访问页面去调用发布者

http://localhost:8088/redis/redisMsgPubClientBySpring?cutomerId=all&msg=你们好

订阅收到的消息

 

5.拓展开发

  在分布式服务中,可以结合WebSocket与Redis的发布订阅模式相结合,解决session不能共享的问题。

  当业务处理完成之后,通过Redis的发布订阅模式,发布消息到每个订阅该频道的服务节点,然后由每个服务节点通过key寻找自己内存缓存中的session,然后找到了就向客户端推消息,否则不处理。

Dubbo只能传输可序列化的对象,Redis只能缓存可序列化的对象,Dubbo基于网络流(TCP),Redis缓存的数据要存储在硬盘上,而WebSocketSession是没有实现序列化的,所以不能跨服务传递WebSocketSession,也不能使用Redis存储WebSocketSession,只能自定义一块缓存区。

6.动态订阅频道

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.RedisSerializer; import com.alibaba.fastjson.JSONObject; import com.gfss.common.websocket.CustomWebSocketHandler; /** * redis发布订阅消息监听器 * @ClassName: RedisMsgPubSubListener * @Description: TODO * @author OnlyMate * @Date 2018年8月22日 上午10:05:35 * */ public class CustomRedisMsgPubSubListener implements MessageListener { private Logger logger = LoggerFactory.getLogger(CustomRedisMsgPubSubListener.class); @Autowired private CustomWebSocketHandler customWebSocketHandler; @Autowired private ApplicationContext applicationContext; @Autowired private RedisTemplate redisTemplate; /** * 实例: * JSONObject json = new JSONObject(); * json.put("cutomerId", notifyResult.getResult()); * json.put("resultCode", map.get("resultCode")); * //向redis发布消息 * redisTemplate.convertAndSend(channelName, json); * @param message * @param pattern * @Throws * @Author: chetao * @Date: 2019年1月8日 下午10:40:21 * @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[]) */ @Override public void onMessage( final Message message, final byte[] pattern ) { RedisSerializer serializer = redisTemplate.getKeySerializer(); logger.info("Message receive-->pattern:{},message: {},{}", serializer.deserialize(pattern), serializer.deserialize(message.getBody()), serializer.deserialize(message.getChannel())); if ("WEB_SOCKET:PAY_NOTIFY".equals(serializer.deserialize(message.getChannel()))) { RedisMessageListenerContainer redisMessageListenerContainer = applicationContext .getBean("redisMessageListenerContainer", RedisMessageListenerContainer.class); MessageListenerAdapter messageListenerAdapter = applicationContext.getBean("messageListenerAdapter", MessageListenerAdapter.class); /*List collection = new ArrayList(); // 动态添加订阅主题 ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET1:PAY_NOTIFY"); collection.add(channelTopic); PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*"); collection.add(PatternTopic); redisMessageListenerContainer.addMessageListener(messageListenerAdapter, collection);*/ // 动态添加订阅主题 ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET1:PAY_NOTIFY"); redisMessageListenerContainer.addMessageListener(messageListenerAdapter, channelTopic); PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*"); redisMessageListenerContainer.addMessageListener(messageListenerAdapter, PatternTopic); } JSONObject json = JSONObject.parseObject(message.toString()); customWebSocketHandler.sendMessage(json.toJSONString()); } }

上面两种动态订阅频道的方式都可以,本人已测试是可行的,可以结合自己的业务去拓展,如:临时订阅频道后退订频道

 



【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭